package org.hope.lee.consumer.queue;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 *   
 *  @ProjectName: base-project 
 *  @Description: 顺序消费 生产者
 *  操作步骤:
 *  1.先启动ConsumerQueue1,然后再启动ConsumerQueue2,最后启动ProducerQueue
 *  2.两个ConsumerQueue的 group_name 必须一样，然后就能看到负载均衡的顺序消费
 *   解释:
 *   默认的发送会随机指定一个队列
 */
public class ProducerQueue {
    public static void main(String[] args) {
        String group_name = "order_producer";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        try {
            producer.start();
//            String[] tags = new String[] {"TagA", "TagC", "TagD"};
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            /**
             * 第一个队列
             * 通过  public SendResult send(Message msg, MessageQueueSelector selector, Object arg)来指定发送消息到哪个队列
             */
            for(int i = 1; i <= 5; i++) {
                String body = dateStr + "body_1_" + i;
                Message message = new Message("TopicTest", "order1", "KEY" + i, body.getBytes());
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 0); //0是队列的下标
                System.out.println(sendResult + ", body:" + body);
            }

            /**
             * 第二个队列
             */
            for(int i = 1; i <= 5; i++) {
                //时间戳
                String body = dateStr + "order_2" + i;
                Message message = new Message("TopicTest", "body_2_", "KEY" + i, body.getBytes());
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 1); //1是队列的下标
                System.out.println(sendResult + ", body:" + body);
            }
            /**
             * 第三个队列
             */
            for(int i = 1; i <= 5; i++) {
                //时间戳
                String body = dateStr + "order_3" + i;
                Message message = new Message("TopicTest", "body_3_", "KEY" + i, body.getBytes());
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        return list.get(id);
                    }
                }, 2); //2是队列的下标
                System.out.println(sendResult + ", body:" + body);
            }

            producer.shutdown();

        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        }
    }
}
